Contents
  1. 1. 线程池原理
    1. 1.1. 线程池是如何创建的
  2. 2. 线程池的状态
  3. 3. 新任务到达,线程池如何处理
  4. 4. 创建线程池的addWorker()方法
  5. 5. 线程池最核心的方法runWorker()方法
  6. 6. 线程池关闭
  7. 7. 线程池的种类
    1. 7.1. newFixedThreadPool
    2. 7.2. newCachedThreadPool
    3. 7.3. newSingleThreadExecutor
  8. 8. SpringBoot中的线程池
  9. 9. 参考链接

池化最核心的思想就是把宝贵的资源放到池子里,每次使用都从里面获取,用完之后又放回池子供其他人使用。

线程池的好处:

  1. 消除了频繁创建和消亡线程的系统资源开销
  2. 面对过量任务的提交能够平缓地劣化,提高响应速度
  3. 提高线程的可管理性

《Java开发手册》也有这么一条:

以下源码为jdk1.8.0_171

线程池原理

JDK主要提供了Executor框架来使用线程池,它是线程池的基础,提供了一种将任务提交与任务执行分开的解耦机制。

线程池是如何创建的

先来看看ThreadPoolExecutor的构造函数

创建一个线程一般需要输入几个参数:

  • corePoolSize:线程池的基本大小,当提交一个任务到线程池,若需要执行的任务没有超过线程池的基本大小,即使当前有其他空闲的基本线程,就创建一个新的线程;
  • maximumPoolSize:线程池允许的最大线程数,若使用了无界队列此参数就没什么效果;
  • keepAliveTime:线程活动保持时间,即线程池的工作线程空闲后保持存活的时间;
  • unit:线程活动保持时间的单位;
  • workQueue:任务队列,用于保存等待执行的任务的阻塞队列;
  • threadFactory:用于设置创建线程的工厂,可通过它给创建出来的线程设置更有意义的名字,默认的DefaultThreadFactory名字为namePrefix = "pool-" + poolNumber.getAndIncrement() +"-thread-" + threadNumber.getAndIncrement()
  • handler:饱和策略,当任务队列和线程池都满了的时候,必须采取一种策略处理新的任务,默认为AbortPolicy,无法处理新任务时抛出异常
    • AbortPolicy:直接拒绝所提交的任务,抛出RejectedExecutionException异常
    • CallerRunsPolicy:只用调用者所在线程来执行任务
    • DiscardPolicy:不处理直接丢弃掉任务
    • DiscardOldestPolicy:丢弃掉阻塞队列中存放时间的任务并重试execute()

线程池的状态

定义一个AtomicInteger类型的ctl来标识线程池的状态,高三位表示「线程池状态」,低三位表示「线程池中的任务数量」。

线程池状态标识意义:

  • RUNNING (111):能接收新任务并且处理在队列中的任务
  • SHUTDOWN (000):不接收新任务,只能处理已经提交的任务
  • STOP (001):不接收新任务,也不处理已提交的任务,并中断正在处理的任务
  • TIDYING (010):当所有的任务已终止,ctl记录的“任务数量”为0,线程池会变为TIDYING状态,且执行钩子函数terminated(),若想在线程池变为此状态时添加处理逻辑,可重载terminated()方法
  • TERMINATED(011):terminated()方法执行完毕,线程池彻底终止

状态转移如下图:

新任务到达,线程池如何处理

当提交一个新任务到线程池时,会经历如下步骤:

  1. 如果当前运行的线程少于corePoolSize,则会创建新的线程来执行新的任务;
  2. 如果运行的线程个数等于或者大于corePoolSize,则会将提交的任务存放到阻塞队列workQueue中;
  3. 如果当前workQueue队列已满的话,则会创建新的线程来执行任务(执行这一步需要获取全局锁);
  4. 如果线程个数已经超过了maximumPoolSize,则会使用饱和策略RejectedExecutionHandler来进行处理。

提交任务的核心函数为execute(),其处理逻辑为:

execute()源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void execute(Runnable command) {
//如果没有任务,报空指针异常
if (command == null)
throw new NullPointerException();
//获取当前线程池的状态
int c = ctl.get();
//如果工作线程数量小于corePoolSize,创建一个新的线程
if (workerCountOf(c) < corePoolSize) {
//如果添加成功就直接返回
if (addWorker(command, true))
return;
//否则再次获取活动线程数量
c = ctl.get();
}
//否则如果当前线程处于运行状态且写入阻塞队列成功,则进行二次检查
if (isRunning(c) && workQueue.offer(command)) {
//再次对线程池状态检查,因为上面addWorke()过了并且失败了,所以需要检查
int recheck = ctl.get();
//如果线程池不是运行状态,就需要从阻塞队列移除任务,同时执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果当前线程池为空(线程池已关闭),则添加一个null到队列中
else if (workerCountOf(recheck) == 0)
//添加一个空的任务
addWorker(null, false);
}
//上述if判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

创建线程池的addWorker()方法

该方法签名为

1
private boolean addWorker(Runnable firstTask, boolean core)

第一个参数为Runnable类型,表示线程池中某个新提交的任务,第二参数如果为true,创建core核心线程,如果为false,则创建maximumPoolSize线程,这两类线程的生命周期不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个Worker对象
// 其代理了任务对象,并通过线程工厂创建线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask是null,向线程池添加线程
// 因为在SHUTDOWN时不会再添加新任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个拥有所有线程的HashSet,当获得全局锁才可访问
workers.add(w);
// largestPoolSize记录着线程池中出现过的最大线程数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;

Worker类的签名及构造方法签名如下:

1
2
3
4
5
6
7
8
//Worker类继承了AQS并实现了Runnable接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
// 构造函数
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;

注意:在执行t.start()时,由于this.thread = getThreadFactory().newThread(this)会传入this指针,会间接调用实现了Runnable接口的Worker类的run方法。

Worker类的run方法实现如下:

1
2
3
public void run() {
runWorker(this);
}

线程池最核心的方法runWorker()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
//允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

总结一下runWorker()的执行过程:

  1. while循环不断地通过getTask()方法获取任务
  2. getTask()方法从阻塞队列中获取任务
  3. 如果线程池正在停止,要保证当前线程是中断状态;如果不是的话,则要保证当前线程不是中断状态
  4. 调用task.run()执行任务
  5. 如果task为null,则跳出循环,执行processWorkerExit()方法
  6. runWorker()方法执行完成,也即Worker中的run()方法,也即addWorker()中的t.start()方法执行完毕,销毁线程

注意:

beforeExecute()afterExecute()在ThreadExecutor中是空的,可以自行扩展功能。

线程池关闭

ThreadPoolExecutor提供了shutdown()shutdownNow()两个方法来关闭线程

shutdown()按照过去任务提交的顺序发起一个有序的关闭,不接受新任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//加上可重入锁
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 设置线程池状态为SHUTDOWN
interruptIdleWorkers(); //中断空闲线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock(); //释放锁
}
tryTerminate(); //将线程池状态设置为TERMINATED
}

shutdownNow()也是停止接收新任务,但会中断所有任务,将线程池状态变为TERMINATED

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); //设置线程池状态为STOP
interruptWorkers(); //中断所有线程
tasks = drainQueue(); //获取等待的任务列表
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks; //返回等待的任务列表
}

区别:

  • 调用shutdown()后,线程池状态立刻变为SHUTDOWN,而调用shutdownNow(),线程池状态立刻变为STOP;
  • shutdown()等待任务执行完才中断线程,而shutdownNow()不等任务执行完就中断了线程。

Tips: 优雅的线程关闭方式:

1
2
3
4
5
6
7
8
9
10
11
12
long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
pool.execute(new MyTask());
}
pool.shutdown();

while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.info("线程还在执行呢...");
}

long end = System.currentTimeMillis();
LOGGER.info("一共处理了" + (end - start) + "秒");

线程池的种类

  • 固定线程池 ExecutorService service1 = Executors.newFixedThreadPool(5)
  • 单例线程池 ExecutorService service2 = Executors.newSingleThreadExecutor();
  • 缓存线程池 ExecutorService service3 = Executors.newCachedThreadPool();
  • 任务调用线程池 ExecutorService service4 = Executors.newScheduledThreadPool(2);

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

其中corePoolSize等于maximumPoolSize。

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

无限线程池,接收到新任务线程池会毫不犹豫地新开一个线程处理。

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

创建单个worker线程的线程池。

SpringBoot中的线程池

需要导入Guava的包

1
2
3
4
5
dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.2</version>
</dependency>

首先配置线程池:

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class ThreadPoolConfig {
//消费线程队列
@Bean(value="consumerQueueThreadPool")
public ExecutorService buildConsumerQueueThreadPool() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("consumer-queue-thread-%d").build();
ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
return pool;
}
}

直接使用配置好的消费队列线程池Bean:

1
2
3
4
5
6
7
8
9
10
@Resource(name = "consumerQueueThreadPool")
private ExecutorService consumerQueueThreadPool;
@Override
public void execute() {
//消费队列
for (int i = 0; i < 5; i++) {
consumerQueueThreadPool.execute(new ConsumerQueueThread());
}

}

参考链接

  1. 深入理解Java线程池
  2. 并发编程之线程池的使用及扩展和优化
  3. 剖析线程池实现原理
  4. 如何优雅的使用和理解线程池
  5. 线程池ThreadPoolExecutor实现原理
  6. 《阿里巴巴Java开发手册》
Contents
  1. 1. 线程池原理
    1. 1.1. 线程池是如何创建的
  2. 2. 线程池的状态
  3. 3. 新任务到达,线程池如何处理
  4. 4. 创建线程池的addWorker()方法
  5. 5. 线程池最核心的方法runWorker()方法
  6. 6. 线程池关闭
  7. 7. 线程池的种类
    1. 7.1. newFixedThreadPool
    2. 7.2. newCachedThreadPool
    3. 7.3. newSingleThreadExecutor
  8. 8. SpringBoot中的线程池
  9. 9. 参考链接